package com.bodystm.server;

import java.util.ArrayList;
import java.util.List;

import org.apache.commons.lang.ArrayUtils;
import org.apache.log4j.Logger;

import com.bodystm.algorithm.DllInterface;
import com.bodystm.algorithm.EcgHR;
import com.bodystm.algorithm.IirFilter;
import com.bodystm.algorithm.MathTools;
import com.bodystm.algorithm.MedFilter;
import com.bodystm.bean.Bed;
import com.bodystm.bean.Ecg;
import com.bodystm.bean.datadeal.BodystmProduct;
import com.bodystm.bean.datadeal.EcgAnalysis;
import com.bodystm.bean.datadeal.OximetryAnalysis;
import com.bodystm.bean.datadeal.EcgAnalysis.EcgChannel;
import com.bodystm.config.PublicSetting;
import com.bodystm.manager.DataHandlerManager;
import com.bodystm.util.MathUtils;
import com.bodystm.web.DeviceSettings;

import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import redis.clients.jedis.Jedis;
/**
 * 消费者使用ScheduleMQ接收数据
 * ScheduleMQ mq = new ScheduleMQ();  //也就是consumer
       mq.start();
 * @author ehl
 *
 */
public class ScheduleMQ4Ecg extends Thread {
	Logger logger=Logger.getLogger(ScheduleMQ4Ecg.class);
	private byte[] mac;
	private String strMac;
	private byte[] rediskey;
	/*private MedFilter medFilter1=new MedFilter();//中值滤波
	private MedFilter medFilter2=new MedFilter();//中值滤波
	private MedFilter medFilter5=new MedFilter();//中值滤波
	private IirFilter iirFilter1=new IirFilter();//低通滤波
	private IirFilter iirFilter2=new IirFilter();//低通滤波
	private IirFilter iirFilter5=new IirFilter();//低通滤波
	private MathTools mathTools=new MathTools();
	private EcgHR ecgHR=new EcgHR();*/
	//===============算法改为调用dll的形式=====================
	private long medFilter1=DllInterface.INSTANCE.MedFilter_C();
//	private MedFilter medFilter11=new MedFilter();
//	private long medFilter2=DllInterface.INSTANCE.MedFilter_C();//中值滤波
//	private long medFilter5=DllInterface.INSTANCE.MedFilter_C();//中值滤波
//	private long iirFilter1=DllInterface.INSTANCE.IIFilter_C();//低通滤波
	private long  LowPassFilter1_C=DllInterface.INSTANCE.LowPassFilter4_C();
	private long  LowPassFilter2_C=DllInterface.INSTANCE.LowPassFilter4_C();
	private long  LowPassFilter5_C=DllInterface.INSTANCE.LowPassFilter4_C();
//	private IirFilter iirFilter1=new IirFilter();//低通滤波
//	private long iirFilter2=DllInterface.INSTANCE.IIFilter_C();//低通滤波
//	private long iirFilter5=DllInterface.INSTANCE.IIFilter_C();//低通滤波
//	private MathTools mathTools=new MathTools();
	private long mathTools=DllInterface.INSTANCE.Resampling_C();
	private EcgHR ecgHR=new EcgHR();
	private long ecgHR_L=DllInterface.INSTANCE.Count_Ecg_Hr_C();
	//===================================================
	private Ecg ecg=null;
	private EcgAnalysis ecgAnalysis=new EcgAnalysis();
	private Bed bed=null;
	private int simplifyArg=ecgAnalysis.getSimplifyArg();
	public ScheduleMQ4Ecg(byte[] data){//mac+flag=rediskey
		this.mac=ArrayUtils.subarray(data, 0, 6);
		this.strMac=ByteUtil.convertByteToString(mac);
		this.rediskey=data;
		if(DataHandlerManager.hasDataAynalysis.contains(strMac+BodystmProduct.BodystmECG.ordinal())){
			ecgAnalysis.channelType=((EcgAnalysis)DataHandlerManager.hasDataAynalysis.get(strMac+BodystmProduct.BodystmECG.ordinal())).channelType;
		}
		DataHandlerManager.updateHasDataAynalysis(strMac+BodystmProduct.BodystmECG.ordinal(), ecgAnalysis);
	}
	@Override
	public void run(){
		//==========================
		//建立websocket服务，等待客户端连接
		
		//==========================
		Jedis jedis=null;
		/*for (Bed bed : DeviceSettings.lstBeds) {
			if (!bed.getStatus().equals(PublicSetting.BED_STATUS_EMPTY)&&bed.getConcentratorNo().equals(strMac)) {
				this.bed=bed;
				ecg=bed.getEcg();
				break;
			}
		}*/
		jedis = RedisManager.getJedis();  
	    if (jedis==null) {
	    	logger.error("redis服务连接失败！");
		}
		jedis.brpop(0, rediskey);//防止重连时受旧数据影响，先清空队列里旧数据 
		try {
//			logger.info("path:"+this.getClass().getResource(""));
//			logger.info(this.getClass().getResource("/").getPath());
//			long medFilter1=DllInterface.INSTANCE.MedFilter_C();
//			long medFilter2=DllInterface.INSTANCE.MedFilter_C();//中值滤波
//			long medFilter5=DllInterface.INSTANCE.MedFilter_C();//中值滤波
//			long iirFilter1=DllInterface.INSTANCE.IIFilter_C();//低通滤波
//			long iirFilter2=DllInterface.INSTANCE.IIFilter_C();//低通滤波
//			long iirFilter5=DllInterface.INSTANCE.IIFilter_C();//低通滤波
			/*boolean ifStarted=false,ifstastics=false;
			long starttime = 0,endtime;int count4Point=0,count4target=0;*/
			while(!isInterrupted()) {
			    //jedis = RedisManager.getJedis();  
			    if (jedis==null) {
			    	logger.error("redis服务连接失败！");
					break;
				}
			    ecg=null;//为了防止后台修改床位mac时，哈希表中bed变为新的，这里还在更新旧的bed对象
			    if (ecg==null) {
			    	bed = DeviceSettings.hasBeds.get(strMac);
			    	if (null!=bed&&!bed.getStatus().equals(PublicSetting.BED_STATUS_EMPTY)) {
			    		ecg=bed.getEcg();
			    		if (ecg==null) {
			    			ecg=new Ecg();
			    			bed.setEcg(ecg);
			    		}
//			    		if (ecg.getEcgAnalysis()==null)//为了防止后台修改床位mac时，哈希表中bed变为新的，这里还在更新旧的bed对象
//			    		ecg.setEcgAnalysis(ecgAnalysis);//暂时去掉，以修复偶尔无法修改通道的bug，两个ecgAnalysis不是一个；2018年6月24日21:01:48
			    	}
			    	///2018年3月2日11:32:04
			    	/*for (Bed bed : DeviceSettings.lstBeds) {
						if (!bed.getStatus().equals(PublicSetting.BED_STATUS_EMPTY)&&bed.getConcentratorNo().equals(strMac)) {
							ecg=bed.getEcg();
							if (ecg==null) {
								ecg=new Ecg();
								bed.setEcg(ecg);
							}
							break;
						}
					}*/
			    	//continue;
			    }
			    //阻塞式brpop，List中无数据时阻塞  
			    //参数0表示一直阻塞下去，直到List出现数据  
			    List<byte[]> list = jedis.brpop(0, rediskey);  
			    /*if(ifStarted==false){
			    	ifStarted=true;
			    	starttime=System.currentTimeMillis();
			    	logger.info("start:"+starttime);
			    }*/
			    //更新数据更新时间
			    if (bed!=null) {
					bed.setEcgUpdateMs(System.currentTimeMillis());
				}
			    Channel channel =null;
			    ArrayList<Channel> channels=null;
			    //WebSocketManager.wsServer.sendMessage(strMac, strMac);
			    if (WebSocketManager.wsServer!=null) {
//			    	channel = WebSocketManager.wsServer.channelsMap4Ecg.get(strMac);
//			    	channel = WebSocketManager.wsServer.channelsMap.get("userid");
			    	channels = WebSocketManager.wsServer.channelsMap4Ecg2.get(strMac);
				}
//		        if (channel!=null&&channel.isOpen()) {
		        	for(byte[] bs : list) {
		        		//忽略返回的列表中的key值
		        		if (bs.length<10) {
							continue;
						}
				        //System.out.println(bs);  
				        //此处编写消息处理逻辑，比如解析消息后通过websocket将心电血氧波形数据发送到客户端
				        //滤波算法解析
				        StringBuilder sBuilder=new StringBuilder();
				        sBuilder.append("/");sBuilder.append(strMac);sBuilder.append("/ECG/");
				        for(int i=0;i<EcgAnalysis.ECG_POINTS_PER_PACK;i++){
				        	//Channel I
				        	double value1=Double.parseDouble(MathUtils.binary(ArrayUtils.subarray(bs, 8+9*i, 11+9*i), 10));
				        	if(value1 > 8388608.0)
				        		value1 -=16777216;
//				        	logger.info(value1);
				        	/*if (value1==0) {
				        		logger.info(ByteUtil.convertByteToString(bs));
								break;
							}*/
//				        	logger.info(ByteUtil.convertByteToString(bs));
				        	/*value1=medFilter1.MedFilterFunc(value1);
				        	value1=iirFilter1.lowpassFilter(value1);*/
				        	value1=DllInterface.INSTANCE.MedFilter_B(value1, 0, medFilter1);
				        	value1=DllInterface.INSTANCE.LowPassFilter4_B(value1, LowPassFilter1_C);//(value1, 0, 0, iirFilter1);
				        	//Channel II
				        	double value2=Double.parseDouble(MathUtils.binary(ArrayUtils.subarray(bs, 11+9*i, 14+9*i), 10));
				        	if(value2 > 8388608.0)
				        		value2 -=16777216;
				        	/*value2=medFilter2.MedFilterFunc(value2);
				        	value2=iirFilter2.lowpassFilter(value2);*/
				        	value2=DllInterface.INSTANCE.MedFilter_B(value2, 1, medFilter1);
				        	value2=DllInterface.INSTANCE.LowPassFilter4_B(value2, LowPassFilter2_C);
				        	//Channel V
				        	double value5=Double.parseDouble(MathUtils.binary(ArrayUtils.subarray(bs, 14+9*i, 17+9*i), 10));
				        	if(value5 > 8388608.0)
				        		value5 -=16777216;
				        	/*value5=medFilter5.MedFilterFunc(value5);
				        	value5=iirFilter5.lowpassFilter(value5);*/
				        	value5=DllInterface.INSTANCE.MedFilter_B(value5, 2, medFilter1);
				        	value5=DllInterface.INSTANCE.LowPassFilter4_B(value5, LowPassFilter5_C);
				        	//Channel III
				        	//III = II - I
				        	double value3=value2-value1;
				        	//Channel aVR
				        	//aVR = (-1/2) * (I + II)
				        	double aVR=-0.5*(value2+value1);
				        	//Channel aVF
				        	//aVF = (-1/2) * (I - 2II)
				        	double aVF=-0.5*(value1-2*value2);
				        	//Channel aVL
				        	//aVL = (-1/2) * (II - 2I)
				        	double aVL=-0.5*(value2-2*value1);
				        	//Calc HeartRate
//				        	int unHR1 = ecgHR.Count_Ecg_Hr((float)value2);
				        	int unHR = DllInterface.INSTANCE.Count_Ecg_Hr_B((float)value1, ecgHR_L);
				        	unHR =ecgHR.getAvgHr(unHR);
//				        	logger.info("HR_JAVA:"+unHR1+";HR_C++:"+unHR);
				        	if (ecg!=null&&ecg.getHr()!=unHR) {
				        		ecg.setHr(unHR);
				        		//通过ws发送到客户端 /mac/type/value
				        		/*if (channel!=null&&channel.isOpen()) {
				        			channel.writeAndFlush(new TextWebSocketFrame("/"+strMac+"/ECGHR/"+unHR));
				        		}*/
							}
				        	//将算法解析数据存入缓存redis
						    RedisManager.set("hr", String.valueOf(unHR));  
				        	//copy All to painter buffer
				            //do sample check ECG
				        	//重采样
				        	//double dResampleRet[]=new double[8];
//	        				int nResamplingRet=mathTools.Resampling((float)1.0/EcgAnalysis.SAMPLE_RATE, (float)1.0/(ecgAnalysis.npTargetSampleRate), dResampleRet, value1,0);
//				        	count4Point++;
				        	int nResamplingRet=DllInterface.INSTANCE.Resampling_B((float)1.0/EcgAnalysis.SAMPLE_RATE, (float)1.0/(ecgAnalysis.npTargetSampleRate), value1,0, mathTools);
//				        	//test===============================
				        	/*count4target+=nResamplingRet>=0?nResamplingRet+1:0;
				        	endtime=System.currentTimeMillis();
				        	if(endtime-starttime>=10000){
				        		if(ifstastics==false){
				        			ifstastics=true;
				        			logger.info("endtime:"+endtime);
				        			logger.info("time:"+(endtime-starttime)+"ms");
				        			logger.info("pointCounts:"+count4Point);
				        			logger.info("pointCountsResample:"+count4target);
				        			logger.info("sampleRate:"+((double)count4Point)/((endtime-starttime)/1000d));
				        			logger.info("reSampleRate:"+((double)count4target)/((endtime-starttime)/1000d));
				        			logger.info("预期采样率："+ecgAnalysis.npTargetSampleRate);
				        		}
				        	}*/
				        	//test===============================
	        				for (int j = 0; j <= nResamplingRet; j++)
        					{
        						//组装ws发送的数据
        						if (ecgAnalysis.channelType==EcgChannel.标1通道) {
        							sBuilder.append(value1/simplifyArg);sBuilder.append(",");//参数3000可调
        						}else if (ecgAnalysis.channelType==EcgChannel.标2通道) {
        							sBuilder.append(value2/simplifyArg);sBuilder.append(",");
        						}else {
        							sBuilder.append(value5/simplifyArg);sBuilder.append(",");
        						}
        					}
				        }
				        sBuilder.deleteCharAt(sBuilder.length()-1);
				        /*if (channel!=null&&channel.isOpen()) {
					        channel.writeAndFlush(new TextWebSocketFrame(sBuilder.toString()));
					        
					    }*/ 
				        if (channels!=null&&channels.size()>0) {
				        	String msg = sBuilder.toString();
							for(Channel ch:channels){
								if (ch!=null&&ch.isOpen()) {
									ch.writeAndFlush(new TextWebSocketFrame(msg));
							    }
							}
						}
		        	//channel.flush();
				    
				}
			     
  
			}
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
			// Thread.sleep()方法由于中断抛出异常。  
            // Java虚拟机会先将该线程的中断标识位清除，然后抛出InterruptedException，  
            // 因为在发生InterruptedException异常的时候，会清除中断标记  
            // 如果不加处理，那么下一次循环开始的时候，就无法捕获这个异常。  
            // 故在异常处理中，再次设置中断标记位  
            Thread.currentThread().interrupt();
		} finally {
			//jedis.close();  
			RedisManager.close(jedis);
			DllInterface.INSTANCE.MedFilter_D(medFilter1);
			DllInterface.INSTANCE.LowPassFilter4_D(LowPassFilter1_C);
			DllInterface.INSTANCE.LowPassFilter4_D(LowPassFilter2_C);
			DllInterface.INSTANCE.LowPassFilter4_D(LowPassFilter5_C);
			DllInterface.INSTANCE.Resampling_D(mathTools);
			DllInterface.INSTANCE.Count_Ecg_Hr_D(ecgHR_L);
		}
	}
}
